-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update replicator retry logic #66
Conversation
7c396be
to
09249ba
Compare
@@ -205,39 +198,6 @@ async fn main() -> Result<(), String> { | |||
Ok(()) | |||
} | |||
|
|||
fn start_queue_monitor(queue: Arc<Receiver<TransferRequest>>) -> JoinHandle<Result<(), String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we are removing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not really needed. This was useful when replicator was part of cohort-sdk
. I forgot to remove it in the earlier PR which was merged to master last week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is observing how quickly cohort is draining the generated queue. It is not related to replicator. I prefer it to stay :) What you saw previously is that monitor was printing stats of generator queue and replicator queue (via heartbeat channel), but now it observes only generator. Btw if we remove it, I am not sure how system will stop itself, because async services are running forever, the don't know when to stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aaah okay, I must have got confused with the earlier logic. Will add this back.
The app stops correctly even after I remove it. In fact one reason why I removed it was, sometimes, the monitor would preemptively stop the app even before it reached the target number of txn
. Probably there is a bug somewhere there, which I didn't dig too much in detail, but just deleted the code :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can happen when generated queue becomes empty but there is some work still queued on the replicator side. The monitor does not shut down immediately, but waits for some number of seconds. Of course if that wait period is not long enough then app can close too early. It is the matter of giving it more time to wait :)
Codecov ReportPatch coverage has no change and project coverage change:
Additional details and impacted files@@ Coverage Diff @@
## master #66 +/- ##
==========================================
- Coverage 61.88% 61.73% -0.16%
==========================================
Files 88 89 +1
Lines 4909 4921 +12
==========================================
Hits 3038 3038
- Misses 1871 1883 +12
☔ View full report in Codecov by Sentry. |
loop { | ||
let tx = cnn.transaction().await.map_err(|e| e.to_string())?; | ||
if !statemap.is_empty() { | ||
let updated_rows_res = BankStatemapInstaller::update_bank_transfer_request(&tx, &statemap, snapshot_version).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to rename this method "update_bank_transfer_request". By reading it feels like we are updating the request. However we are installing the given statemap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update this.
@@ -11,13 +14,72 @@ pub struct StatemapInstallerConfig { | |||
pub thread_pool: Option<u16>, | |||
} | |||
|
|||
async fn statemap_install_future( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RE: method name with ending _future. What does it mean? When reading it is not straightforward to understand what will happen when method is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't future
is pretty self explanatory in this context.
To me it was pretty obvious based on being used in the tokio task, that this fn
is the future expected in the tokio task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me method name is about "do something" or "get something", "parse something", "create something" and etc From that context I don't know what _future does here. Why do we need to encode a data type into the method name :)
const SNAPSHOT_UPDATE_QUERY: &str = r#"UPDATE cohort_snapshot SET "version" = ($1)::BIGINT WHERE id = $2 AND "version" < ($1)::BIGINT"#; | ||
|
||
fn is_retriable_error(error: &str) -> bool { | ||
// TODO: improve retriable error detection when error moves from String to enum or struct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, this is Postgres text. Somehow it should be abstracted depending on DB type.
let mut cnn = self.database.get().await.map_err(|e| e.to_string())?; | ||
let tx = cnn.transaction().await.map_err(|e| e.to_string())?; | ||
impl BankStatemapInstaller { | ||
async fn install_bank_transfer_statemap(tx: &Transaction<'_>, statemap: &[StatemapItem], _snapshot_version: u64) -> Result<u64, String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
); | ||
replicator_tx | ||
.send(ReplicatorChannel::InstallationFailure(format!( | ||
"Installed failed for version={version:?} error={err:?}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a typo in "Installed"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup will update this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Left minor typo warnings.
No description provided.